Skip to content

Support Hadoop SequenceFiles Scan#14061

Draft
thirtiseven wants to merge 48 commits intoNVIDIA:mainfrom
thirtiseven:seq_file_reader
Draft

Support Hadoop SequenceFiles Scan#14061
thirtiseven wants to merge 48 commits intoNVIDIA:mainfrom
thirtiseven:seq_file_reader

Conversation

@thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Dec 23, 2025

Closes #14065

Description

This PR:

  • register a new fileformat com.nvidia.spark.rapids.SequenceFileBinaryFileFormat
  • and add a reader for it

to read Hadoop Sequence Files to GPU:

spark.read.format("com.nvidia.spark.rapids.SequenceFileBinaryFileFormat").load(logPath)

Performance tests

val NUM_FILES = 200
val RECORDS_PER_FILE = 50000
val VALUE_SIZE = 1024
val ITERATIONS = 5

I ran performance tests on 200 files with 50,000 records and a 1 MB size per value.

script

Since the decode happens on CPU we got similar perf numbers with CPU file format and we need to copy data to GPU for GPU file format. But it's about 2 times faster than CPU RDD scan.

CPU value-only            |          1.24 s |           2.00x
GPU MT-8 threads          |          1.25 s |           1.98x
GPU MT-4 threads          |          1.27 s |           1.95x
GPU MT-2 threads          |          1.28 s |           1.94x
GPU MULTITHREADED         |          1.28 s |           1.94x
CPU FileFormat            |          1.35 s |           1.84x
GPU AUTO                  |          1.37 s |           1.82x
GPU PERFILE               |          1.37 s |           1.81x
GPU value-only            |          1.38 s |           1.80x
GPU COALESCING            |          1.41 s |           1.76x
RDD Scan                  |          2.49 s |        baseline

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for reading Hadoop SequenceFiles in the RAPIDS Accelerator for Apache Spark. It registers a new file format sequencefilebinary that reads SequenceFile key/value pairs as raw BinaryType columns on the GPU.

Key Changes

  • Introduces SequenceFileBinaryFileFormat as a new DataSource that reads SequenceFiles and exposes key/value as BinaryType columns
  • Implements GPU-accelerated reading via GpuReadSequenceFileBinaryFormat and associated partition readers
  • Integrates the new format into GpuFileSourceScanExec for GPU execution path routing

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormat.scala CPU-side FileFormat implementation with row-based reader for SequenceFiles
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadSequenceFileBinaryFormat.scala GPU-enabled FileFormat wrapper with metadata support and multi-file reader factory
sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala Core GPU partition readers with host-side buffering and device column materialization
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala Integration point registering SequenceFileBinary format in GPU scan execution
tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Test suite in tests module for wildcard discovery
sql-plugin/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Duplicate test suite in sql-plugin module

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@Greptile full review

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 23, 2025

Greptile Summary

This PR adds GPU-accelerated reading of Hadoop SequenceFiles by introducing a new GpuSequenceFileSerializeFromObjectExec that replaces SerializeFromObjectExec nodes backed by SequenceFile RDD scans at the physical plan level, and a supporting MultiFileCloudSequenceFilePartitionReader that reads files in parallel using a thread pool with an optional zero-copy GPU-concat combine mode. Performance benchmarks show ~2× speedup over the RDD baseline.

Key changes:

  • New GpuSequenceFileSerializeFromObjectExecMeta detects SequenceFile RDD lineage via reflection and gates on a planning-time compression check, falling back to CPU for compressed inputs.
  • New HostBinaryListBufferer and MultiFileCloudSequenceFilePartitionReader handle CPU-side decoding and H2D transfer of BytesWritable payloads into cuDF LIST<UINT8> device columns.
  • Three new RapidsConf keys control reader type, parallelism cap, and enable/disable of the physical replacement.
  • Integration and unit test coverage for basic reads, empty files, large batches, binary data, and missing/corrupt file handling.

Issues found:

  • Data-loss risk: UnsupportedOperationException (thrown for compressed files at execution time) extends RuntimeException and is caught by the ignoreCorruptFiles handler, silently dropping compressed SequenceFiles when spark.sql.files.ignoreCorruptFiles=true. This is the most significant issue.
  • Unreliable cycle detection: isSimpleSequenceFileRDD uses System.identityHashCode for the seen set — identity hash codes are not unique across JVM objects and can collide, incorrectly short-circuiting RDD lineage traversal and causing unnecessary CPU fallbacks.
  • Double-close of SequenceFile.Reader: The closeOnExcept(reader) inner block and the outer try-finally both call reader.close(); the redundant close should be removed.
  • Header version not checked in isCompressedSequenceFile: The isBlockCompressed boolean field is absent in SequenceFile format versions earlier than 5; reading it unconditionally can produce incorrect compression detection for old files.

Confidence Score: 2/5

  • Not safe to merge as-is due to a data-loss risk when ignoreCorruptFiles=true encounters compressed SequenceFiles at execution time.
  • The UnsupportedOperationException thrown for compressed files is silently swallowed as a "corrupted file" when ignoreCorruptFiles=true, dropping records without surfacing an error. While the planning-time check reduces the likelihood, it is not exhaustive (samples only one file per path), making data loss possible in production. The remaining issues (identity-hash cycle detection, double-close, old SequenceFile version header) are lower-severity but still warrant fixes before merge.
  • sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala (ignoreCorruptFiles + double-close) and sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala (identityHashCode cycle detection + SequenceFile header parsing).

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala Core multi-threaded SequenceFile reader: contains two notable bugs — compressed files are silently dropped when ignoreCorruptFiles=true (UnsupportedOperationException caught as RuntimeException), and SequenceFile.Reader is double-closed in doRead(). Also ships extensive logDebug instrumentation at object-construction time that was previously flagged.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala Planning-time meta that identifies SequenceFile RDD scans via reflection. Cycle detection uses System.identityHashCode (non-unique, can collide) instead of proper reference tracking; and isCompressedSequenceFile does not guard the isBlockCompressed read against pre-v5 format files. Both are logic issues though with low probability impact.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuSequenceFileSerializeFromObjectExec.scala GPU exec node that replaces SerializeFromObjectExec for SequenceFile RDD scans; correctly falls back to CPU doExecute() for the non-GPU path, and lazily enumerates files on the driver. Overall clean implementation with no critical issues.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala Adds three new SequenceFile config keys (reader type, max parallel files, RDD physical replace enable). The isSequenceFileMultiThreadReadEnabled lazy val correctly rejects COALESCING and silently re-maps PERFILE to MULTITHREADED with a warning. Minor: PERFILE deprecation path is silent except for a warning; no test covers it explicitly.
integration_tests/src/main/python/sequencefile_test.py Comprehensive integration test suite covering basic reads, empty files, large batches, binary data, missing/corrupt files, and combine mode. No critical issues; the corrupt-file test correctly expects an error even with ignoreCorruptFiles=true for RDD-path reads.
tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Unit tests for RDD physical replacement and format suite; covers physical-replace enable/disable, schema handling, and compression rejection. No critical issues found.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Registers SerializeFromObjectExec with the GPU overrides framework using the new meta class. Change is minimal and follows the established pattern exactly.

Sequence Diagram

sequenceDiagram
    participant Driver
    participant Meta as GpuSequenceFileSerializeFromObjectExecMeta
    participant Exec as GpuSequenceFileSerializeFromObjectExec
    participant Factory as GpuSequenceFileMultiFilePartitionReaderFactory
    participant Reader as MultiFileCloudSequenceFilePartitionReader
    participant Hadoop as SequenceFile.Reader (CPU)
    participant GPU as cuDF / GPU

    Driver->>Meta: tagPlanForGpu()
    Meta->>Meta: isSimpleSequenceFileRDD() [reflection]
    Meta->>Meta: isCompressedSequenceFile() [header peek]
    Meta-->>Driver: willNotWorkOnGpu / OK

    Driver->>Exec: internalDoExecuteColumnar()
    Exec->>Factory: build reader factory
    Exec->>Reader: GpuDataSourceRDD per FilePartition

    loop For each PartitionedFile (threaded)
        Reader->>Hadoop: SequenceFile.Reader.nextRaw()
        Hadoop-->>Reader: raw key/value bytes
        Reader->>Reader: HostBinaryListBufferer.addBytes()
    end

    alt combineConf active (small files)
        Reader->>Reader: combineHMBs() — zero-copy chunk aggregation
    end

    Reader->>GPU: buildDeviceColumnFromChunks() → ColumnVector.concatenate()
    GPU-->>Exec: ColumnarBatch (LIST<UINT8> key + value)
    Exec-->>Driver: Iterator[ColumnarBatch]
Loading

Comments Outside Diff (4)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 1283-1291 (link)

    Compressed files silently dropped when ignoreCorruptFiles=true

    UnsupportedOperationException (thrown at line 1308 for compressed SequenceFiles) extends RuntimeException, so it will be caught by the ignoreCorruptFiles handler here. This means any compressed SequenceFile that is not detected at planning time (e.g., a newly-written compressed file added after planning) will be silently dropped with a "Skipped corrupted file" warning rather than surfacing a proper error.

    The planning-time check in tagPlanForGpu inspects only a sample file per path, so it is possible for compressed files to reach execution. When ignoreCorruptFiles=true, those files would be silently excluded from the result set — a data-correctness issue.

    The fix is to not classify UnsupportedOperationException as a "corrupt file" condition:

      try {
        doRead()
      } catch {
        case e: FileNotFoundException if ignoreMissingFiles =>
          logWarning(s"Skipped missing file: ${partFile.filePath}", e)
          SequenceFileEmptyMetaData(partFile, 0L)
        case e: FileNotFoundException if !ignoreMissingFiles => throw e
        case _: UnsupportedOperationException => throw e   // don't swallow unsupported ops
        case e@(_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
          logWarning(s"Skipped corrupted file: ${partFile.filePath}", e)
          SequenceFileEmptyMetaData(partFile, 0L)
      }
  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala, line 530-543 (link)

    System.identityHashCode is unreliable for cycle/visited tracking

    System.identityHashCode does not guarantee uniqueness — two distinct RDD objects can return the same value (birthday-problem collision probability ≈ 1 in 2³²). When a collision occurs, a legitimate RDD in the lineage chain is silently misidentified as "already seen" and traversal is cut short, causing isSimpleSequenceFileRDD to return false for a valid SequenceFile RDD. The consequence is an unnecessary fallback to the CPU path.

    The correct approach is to track actual object identity using a java.util.IdentityHashMap-backed set, e.g.:

    import java.util.{Collections, IdentityHashMap}
    
    def isSimpleSequenceFileRDD(
        rdd: RDD[_],
        seen: java.util.Set[RDD[_]] = Collections.newSetFromMap(new IdentityHashMap())
    ): Boolean = {
      if (!seen.add(rdd)) return false   // already visited (true reference equality)
      rdd match {
        case n: NewHadoopRDD[_, _] => isNewApiSequenceFileRDD(n)
        case h: HadoopRDD[_, _]    => isOldApiSequenceFileRDD(h)
        case other =>
          if (other.dependencies.size != 1) false
          else isSimpleSequenceFileRDD(other.dependencies.head.rdd, seen)
      }
    }
  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 1296-1315 (link)

    Redundant closeOnExcept causes double-close of SequenceFile.Reader

    The outer try { ... } finally { reader.close() } block already guarantees cleanup in all exit paths. The inner closeOnExcept(reader) { _ => ... } is therefore redundant: when the lambda throws (e.g., UnsupportedOperationException), closeOnExcept closes reader and re-throws; then the finally block closes it a second time.

    While SequenceFile.Reader.close() is idempotent in practice, the pattern is misleading and inconsistent with the rest of the reader code that relies solely on finally/withResource for single-close guarantees. The closeOnExcept here can simply be removed:

    val reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path))
    try {
      if (reader.isCompressed || reader.isBlockCompressed) {
        val compressionType = reader.getCompressionType
        throw new UnsupportedOperationException(
          s"SequenceFile reader does not support compressed SequenceFiles " +
          s"(compressionType=$compressionType), file=$path")
      }
      val start = partFile.start
      if (start > 0) {
        reader.sync(start)
      }
      val end = partFile.start + partFile.length
      // ... rest of doRead
    } finally {
      reader.close()
    }
  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala, line 606-628 (link)

    isCompressedSequenceFile assumes a specific SequenceFile header layout that may be incorrect for older format versions

    The method reads the header as: 3-byte magic (SEQ) + 1 version byte + two Text.readString() calls (key/value class names) + isCompressed boolean + isBlockCompressed boolean.

    The isBlockCompressed field was only introduced in SequenceFile format version 5. For files written with version 4 (a legitimate older format), reading the second readBoolean() consumes a byte that actually belongs to the key-class-name or sync-marker area, producing an undefined result. In the best case NonFatal catches the resulting IOException; in the worst case it returns true for a non-compressed file, incorrectly disabling GPU acceleration.

    Consider guarding with an explicit version check:

    val magic = new Array[Byte](4)
    in.readFully(magic)
    if (!(magic(0) == 'S' && magic(1) == 'E' && magic(2) == 'Q')) {
      false
    } else {
      val version = magic(3) & 0xFF
      org.apache.hadoop.io.Text.readString(in)  // key class
      org.apache.hadoop.io.Text.readString(in)  // value class
      val isCompressed = in.readBoolean()
      val isBlockCompressed = if (version >= 5) in.readBoolean() else false
      isCompressed || isBlockCompressed
    }

Last reviewed commit: ed7fa84

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (4)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 125-128 (link)

    logic: check for potential INT32 overflow before it happens

    the check happens after dataLocation has already grown beyond Int.MaxValue, which could cause issues during the buffer growth operations in addBytes or addValueBytes. move the overflow check earlier in those methods before updating dataLocation.

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 73-83 (link)

    logic: potential integer overflow in row capacity calculation

    rowsAllocated * 2 can overflow when rowsAllocated is close to Int.MaxValue / 2. this causes the allocation to wrap to negative or small values.

  3. tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala, line 32-136 (link)

    style: missing test coverage for key scenarios

    add tests for:

    • compressed SequenceFiles (should throw UnsupportedOperationException)
    • multi-file reads to verify the multi-file reader path
    • large batches that exceed maxRowsPerBatch or maxBytesPerBatch
    • partition columns
    • reading only key or only value (not both)
    • empty files

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

  4. tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala, line 102-136 (link)

    style: tests only verify the CPU reader path, not GPU

    this test uses SequenceFileBinaryFileFormat which is the CPU fallback. to test the GPU path with GpuReadSequenceFileBinaryFormat, you'd need to enable the Rapids plugin configuration and verify GPU execution.

5 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

@sameerz sameerz requested a review from mythrocks December 23, 2025 21:57
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 12 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 13 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

This reverts commit e6322bc.
This reverts commit f9f4a8c.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@revans2
Copy link
Collaborator

revans2 commented Jan 21, 2026

I am a little concerned about the general direction of this PR. If the RDDScan is a problem, then perhaps we should be trying to understand how to address that in a generic way instead of then sequence file reader code here. My main concern with the sequence file reader code is that there is a code change required for a user. The no code change is a fundamental principle of this project and I am concerned that we are violating that. This makes it much harder for a customer to switch over and also hard for a customer to fall back to the CPU if they need to.

@thirtiseven
Copy link
Collaborator Author

I am a little concerned about the general direction of this PR. If the RDDScan is a problem, then perhaps we should be trying to understand how to address that in a generic way instead of then sequence file reader code here. My main concern with the sequence file reader code is that there is a code change required for a user. The no code change is a fundamental principle of this project and I am concerned that we are violating that. This makes it much harder for a customer to switch over and also hard for a customer to fall back to the CPU if they need to.

The customer is open to changing how they read the files. But I totally agree that we should keep the no-code-change principle here. It should be possible to introduce a rule that converts the RDDScan to a GPU file reader without changing the user code. I will work on it.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@nvauto
Copy link
Collaborator

nvauto commented Jan 26, 2026

NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support reading Hadoop SequenceFiles on GPU

4 participants